package com.kool.kmqtt.server.repository.etcd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.kool.kmqtt.server.ServerConfig;
import com.kool.kmqtt.server.exception.AppException;
import com.kool.kmqtt.server.exception.ErrorCode;
import com.kool.kmqtt.server.repository.etcd.vo.*;
import com.kool.kmqtt.util.HttpUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.codec.binary.Base64;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * etcd客户端
 */
@Slf4j
public class RestClient {
    /**
     * 保存键值对
     *
     * @param key
     * @param value
     */
    public void put(String key, String value) {
        PutRequest request = new PutRequest();
        request.setKey(Base64.encodeBase64String(key.getBytes()));
        request.setValue(Base64.encodeBase64String(value.getBytes()));
        put(request);
    }

    /**
     * 保存KV，自动过期删除，单位：秒
     *
     * @param key
     * @param value
     * @param timeoutSeconds
     */
    public void put(String key, String value, int timeoutSeconds) {
        GrantRequest grantRequest = new GrantRequest();
        grantRequest.setTtl(Integer.toString(timeoutSeconds));
        String leaseId = leaseGrant(grantRequest);

        PutRequest putRequest = new PutRequest();
        putRequest.setKey(Base64.encodeBase64String(key.getBytes()));
        putRequest.setValue(Base64.encodeBase64String(value.getBytes()));
        putRequest.setLease(Base64.encodeBase64String(leaseId.getBytes()));
        put(putRequest);
    }

    private String leaseGrant(GrantRequest request) {
        String url = "http://" + ServerConfig.getInstance().getEtcdAddress() + "/v3/lease/grant";
        String requestString = JSON.toJSONString(request);
        String responseString = null;
        try {
            log.debug("etcd lease/Grant request, params = {}", requestString);
            responseString = HttpUtil.doPost(url, requestString, null, 5000, 5000);
            log.debug("etcd lease/Grant response = {}", responseString);
            GrantResponse grantResponse = JSON.parseObject(responseString, GrantResponse.class);
            return grantResponse.getId();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new AppException(ErrorCode.ETCD_ERROR, e.getMessage());
        }
    }

    private void put(PutRequest request) {
        String url = "http://" + ServerConfig.getInstance().getEtcdAddress() + "/v3/kv/put";
        String requestString = JSON.toJSONString(request);
        String responseString = null;
        try {
            log.debug("etcd put request, params = {}", requestString);
            responseString = HttpUtil.doPost(url, requestString, null, 5000, 5000);
            log.debug("etcd put response = {}", responseString);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new AppException(ErrorCode.ETCD_ERROR, e.getMessage());
        }

    }

    /**
     * 获取key对应的键值对
     *
     * @param key
     * @return
     */
    public KV get(String key) {
        String url = "http://" + ServerConfig.getInstance().getEtcdAddress() + "/v3/kv/range";
        JSONObject params = new JSONObject();
        params.put("key", Base64.encodeBase64String(key.getBytes()));
        String responseString = null;
        try {
            log.debug("etcd get request, params = {}", params.toJSONString());
            responseString = HttpUtil.doPost(url, params.toJSONString(), null, 5000, 5000);
            log.debug("etcd get rangeResponse = {}", responseString);
            RangeResponse rangeResponse = JSON.parseObject(responseString, RangeResponse.class);
            List<KV> kvs = rangeResponse.getKvs();
            if (kvs != null && kvs.size() > 0) {
                kvs = kvs.stream().peek(item -> {
                    item.setKey(new String(Base64.decodeBase64(item.getKey().getBytes()), StandardCharsets.UTF_8));
                    item.setValue(new String(Base64.decodeBase64(item.getValue().getBytes()), StandardCharsets.UTF_8));
                }).collect(Collectors.toList());
                return kvs.get(0);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new AppException(ErrorCode.ETCD_ERROR, e.getMessage());
        }
        return null;
    }

    /**
     * 获取前缀匹配的key对应的键值对
     *
     * @param keyPrefix
     * @return
     */
    public List<KV> getByPrefix(String keyPrefix) {
        String url = "http://" + ServerConfig.getInstance().getEtcdAddress() + "/v3/kv/range";
        JSONObject params = new JSONObject();
        byte[] keyPrefixBytes = keyPrefix.getBytes();
        params.put("key", Base64.encodeBase64String(keyPrefixBytes));
        byte[] rangeEndBytes = new byte[keyPrefixBytes.length];
        System.arraycopy(keyPrefixBytes, 0, rangeEndBytes, 0, keyPrefixBytes.length);
        rangeEndBytes[keyPrefixBytes.length - 1] = (byte) (keyPrefixBytes[keyPrefixBytes.length - 1] + 1);
        params.put("range_end", Base64.encodeBase64String(rangeEndBytes));
        String responseString = null;
        try {
            log.debug("etcd getByPrefix request, params = {}", params.toJSONString());
            responseString = HttpUtil.doPost(url, params.toJSONString(), null, 5000, 5000);
            log.debug("etcd getByPrefix rangeResponse = {}", responseString);
            RangeResponse rangeResponse = JSON.parseObject(responseString, RangeResponse.class);
            List<KV> kvs = rangeResponse.getKvs();
            if (kvs != null && kvs.size() > 0) {
                return kvs.stream().peek(item -> {
                    item.setKey(new String(Base64.decodeBase64(item.getKey().getBytes()), StandardCharsets.UTF_8));
                    item.setValue(new String(Base64.decodeBase64(item.getValue().getBytes()), StandardCharsets.UTF_8));
                }).collect(Collectors.toList());
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new AppException(ErrorCode.ETCD_ERROR, e.getMessage());
        }
        return null;
    }

    /**
     * 删除key对应的键值对
     *
     * @param key
     * @return
     */
    public List<KV> delete(String key) {
        String url = "http://" + ServerConfig.getInstance().getEtcdAddress() + "/v3/kv/deleterange";
        JSONObject params = new JSONObject();
        params.put("key", Base64.encodeBase64String(key.getBytes()));
        params.put("prev_kv", true);

        String responseString = null;
        try {
            log.debug("etcd delete request, params = {}", params.toJSONString());
            responseString = HttpUtil.doPost(url, params.toJSONString(), null, 5000, 5000);
            log.debug("etcd delete rangeResponse = {}", responseString);
            RangeResponse rangeResponse = JSON.parseObject(responseString, RangeResponse.class);
            List<KV> kvs = rangeResponse.getKvs();
            if (kvs != null && kvs.size() > 0) {
                return kvs.stream().peek(item -> {
                    item.setKey(new String(Base64.decodeBase64(item.getKey().getBytes()), StandardCharsets.UTF_8));
                    item.setValue(new String(Base64.decodeBase64(item.getValue().getBytes()), StandardCharsets.UTF_8));
                }).collect(Collectors.toList());
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new AppException(ErrorCode.ETCD_ERROR, e.getMessage());
        }
        return null;
    }

    /**
     * 删除前缀匹配的key对应的键值对
     *
     * @param keyPrefix
     * @return
     */
    public List<KV> deleteByPrefix(String keyPrefix) {
        String url = "http://" + ServerConfig.getInstance().getEtcdAddress() + "/v3/kv/deleterange";
        JSONObject params = new JSONObject();
        byte[] keyPrefixBytes = keyPrefix.getBytes();
        params.put("key", Base64.encodeBase64String(keyPrefixBytes));
        byte[] rangeEndBytes = new byte[keyPrefixBytes.length];
        System.arraycopy(keyPrefixBytes, 0, rangeEndBytes, 0, keyPrefixBytes.length);
        rangeEndBytes[keyPrefixBytes.length - 1] = (byte) (keyPrefixBytes[keyPrefixBytes.length - 1] + 1);
        params.put("range_end", Base64.encodeBase64String(rangeEndBytes));
        params.put("prev_kv", true);
        String responseString = null;
        try {
            log.debug("etcd delete request, params = {}", params.toJSONString());
            responseString = HttpUtil.doPost(url, params.toJSONString(), null, 5000, 5000);
            log.debug("etcd delete rangeResponse = {}", responseString);
            RangeResponse rangeResponse = JSON.parseObject(responseString, RangeResponse.class);
            List<KV> kvs = rangeResponse.getKvs();
            if (kvs != null && kvs.size() > 0) {
                return kvs.stream().peek(item -> {
                    item.setKey(new String(Base64.decodeBase64(item.getKey().getBytes()), StandardCharsets.UTF_8));
                    item.setValue(new String(Base64.decodeBase64(item.getValue().getBytes()), StandardCharsets.UTF_8));
                }).collect(Collectors.toList());
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new AppException(ErrorCode.ETCD_ERROR, e.getMessage());
        }
        return null;
    }

    /**
     * 事务写
     *
     * @param oldKv base64编码后
     * @param newKv base64编码后
     * @return
     */
    public boolean txn(KV oldKv, KV newKv) {
        String url = "http://" + ServerConfig.getInstance().getEtcdAddress() + "/v3/kv/txn";

        List<KV> compare = new ArrayList<>();
        compare.add(oldKv);

        PutRequest putRequest = new PutRequest();
        putRequest.setKey(newKv.getKey());
        putRequest.setValue(newKv.getValue());

        TxnAfterCompareRequest txnAfterCompareRequest = new TxnAfterCompareRequest();
        txnAfterCompareRequest.setRequestPut(putRequest);

        List<TxnAfterCompareRequest> txnAfterCompareRequests = new ArrayList<>();
        txnAfterCompareRequests.add(txnAfterCompareRequest);

        TxnRequest request = new TxnRequest();
        request.setCompare(compare);
        request.setSuccess(txnAfterCompareRequests);

        String requestString = JSON.toJSONString(request);
        String responseString = null;
        try {
            log.debug("etcd txn request, params = {}", requestString);
            responseString = HttpUtil.doPost(url, requestString, null, 5000, 5000);
            log.debug("etcd txn rangeResponse = {}", responseString);
            TxnResponse txnResponse = JSON.parseObject(responseString, TxnResponse.class);
            return txnResponse.getSucceeded();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new AppException(ErrorCode.ETCD_ERROR, e.getMessage());
        }

    }

    /**
     * 自增1
     *
     * @param key
     */
    public void inrc(String key) {
        //先查询
        KV kv = null;
        KV newKv = null;
        do {
            kv = get(key);
            if (kv == null) {
                return;
            }

            long value = Long.parseLong(kv.getValue());
            kv.setKey(Base64.encodeBase64String(kv.getKey().getBytes()));
            kv.setValue(Base64.encodeBase64String(kv.getValue().getBytes()));

            newKv = new KV();
            newKv.setKey(kv.getKey());
            newKv.setValue(Base64.encodeBase64String(Long.toString(value + 1L).getBytes()));
        } while (!txn(kv, newKv));
    }
}
